Linear RegressionΒΆ
Problem Statement
The input data set contains data about details of various car models.
Based on the information provided, the goal is to come up with a model
to predict Miles-per-gallon of a given model.
Techniques used:
1. Linear Regression ( multi-variate)
2. Data Imputation - replacing non-numeric data with numeric ones
3. Variable Reduction - picking up only relevant features
# -*- coding: utf-8 -*-
import os
os.chdir("/home/cloudops/spark")
os.curdir
# Load the CSV file into a RDD
autoData = sc.textFile("data/auto-miles-per-gallon.csv")
autoData.cache()
# auto-miles-per-gallon.csv MapPartitionsRDD[1] at textFile
# Remove the first line (contains headers)
dataLines = autoData.filter(lambda x: "CYLINDERS" not in x)
dataLines.count() # 398
# Convert the RDD into a Dense Vector. As a part of this exercise
# 1. Remove unwanted columns
# 2. Change non-numeric ( values=? ) to numeric
# NOTE: Install NumPy before
# > pip3 install numpy
import math
# from pyspark.mllib.linalg import Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Use default average HorsePower (HP) for missed values
# this broadcast constant could be calculated from existing values
avgHP = sc.broadcast(80.0)
def transformToNumeric(inputStr):
global avgHP
attList = inputStr.split(",")
#Replace ? values with a normal value
hpValue = attList[3]
if hpValue == "?":
hpValue = avgHP.value
# Filter out columns not wanted at this stage
values= Vectors.dense([ float(attList[0]), \
float(attList[1]), \
hpValue, \
float(attList[5]), \
float(attList[6])
])
return values
# Keep only MPG, CYLINDERS, HP, ACCELERATION and MODELYEAR
autoVectors = dataLines.map(transformToNumeric)
autoVectors.collect()
# [DenseVector([18.0, 8.0, 130.0, 12.0, 70.0]),
# DenseVector([15.0, 8.0, 165.0, 11.5, 70.0]),
# DenseVector([18.0, 8.0, 150.0, 11.0, 70.0]),
# DenseVector([16.0, 8.0, 150.0, 12.0, 70.0]),
# . . .
# =====================================
# Perform Statistical Analysis
# =====================================
# NOTE: MLLib library
from pyspark.mllib.stat import Statistics
autoStats = Statistics.colStats(autoVectors)
autoStats.mean()
# array([ 23.51457286, 5.45477387, 104.10050251, 15.56809045,
# 76.01005025])
autoStats.variance()
# array([ 61.08961077, 2.89341544, 1468.09062947, 7.60484823,
# 13.67244282])
autoStats.min()
# array([ 61.08961077, 2.89341544, 1468.09062947, 7.60484823,
# 13.67244282])
autoStats.max()
# array([ 46.6, 8. , 230. , 24.8, 82. ])
Statistics.corr(autoVectors)
# array([[ 1. , -0.77539629, -0.77463084, 0.42028891, 0.57926713],
# [-0.77539629, 1. , 0.84275215, -0.50541949, -0.3487458 ],
# [-0.77463084, 0.84275215, 1. , -0.68829885, -0.41559383],
# [ 0.42028891, -0.50541949, -0.68829885, 1. , 0.28813695],
# [ 0.57926713, -0.3487458 , -0.41559383, 0.28813695, 1. ]])
# 0.42028891 - low correlation between MPG and ACCELERATION
# The ACCELERATION column [3] could be removed
# =====================================
# Create SQL Context for ML Lib
# =====================================
# Transform to a Data Frame for input to Machine Learning
# Drop columns that are not required (low correlation)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
def transformToLabeledPoint(inStr):
''' [0] - Target variable (MPG) in Label Point
'''
lp = (float(inStr[0]), \
Vectors.dense([inStr[1], inStr[2], inStr[4]]))
return lp
# new RDD
autoLp = autoVectors.map(transformToLabeledPoint)
# new Data Frame from RDD (ML library, not MLLib library!!!)
autoDF = sqlContext.createDataFrame(autoLp, ["label", "features"])
# Display top 10 rows
autoDF.select("label", "features").show(10)
# +-----+----------------+
# |label| features|
# +-----+----------------+
# | 18.0|[8.0,130.0,70.0]|
# | 15.0|[8.0,165.0,70.0]|
# | 18.0|[8.0,150.0,70.0]|
# | 16.0|[8.0,150.0,70.0]|
# | 17.0|[8.0,140.0,70.0]|
# | 15.0|[8.0,198.0,70.0]|
# | 14.0|[8.0,220.0,70.0]|
# | 14.0|[8.0,215.0,70.0]|
# | 14.0|[8.0,225.0,70.0]|
# | 15.0|[8.0,190.0,70.0]|
# +-----+----------------+
# =====================================
# Find Correlations
# =====================================
# from features column (1)
numFeatures = autoDF.take(1)[0].features.size # 3
# labelRDD = autoDF.map(lambda lp: float(lp.label))
# ERROR: 'DataFrame' object has no attribute 'map'
# Keep in mind that MLLIB is built around RDDs
# while ML is generally built around Data Frames.
# You can't map a dataframe, but you can convert the dataframe
# to an RDD and map that by doing spark_df.rdd.map().
# Prior to Spark 2.0, spark_df.map would alias to spark_df.rdd.map().
# With Spark 2.0, you must explicitly call .rdd first.
# separate label RDD for report
labelRDD = autoDF.rdd.map(lambda lp: float(lp.label))
for i in range(numFeatures):
featureRDD = autoDF.rdd.map(lambda lp: lp.features[i])
corr = Statistics.corr(labelRDD, featureRDD, 'pearson')
print('%d\t%g' % (i, corr))
# 0 -0.775396
# 1 -0.774631
# 2 0.579267
# =====================================
# Split DataFrame into Training and Testing data
# =====================================
# Usually splitting 70/30, but we have small dataset
(trainingData, testData) = autoDF.randomSplit([0.9, 0.1])
trainingData.count() # 366
testData.count() # 32
# =====================================
# Build the model on Training Data
# =====================================
# Linear Regression
# =====================================
from pyspark.ml.regression import LinearRegression
# usually 100 iterations to create model
lr = LinearRegression(maxIter=10)
# Fit model to the training data
lrModel = lr.fit(trainingData)
# ERROR:
# IllegalArgumentException: 'requirement failed: Column features must be of type
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'
# trainingData -> DataFrame[label: double, features: vector]
# Workaround:
# instead of
# from pyspark.mllib.linalg import Vectors
#$ use:
# from pyspark.ml.linalg import Vectors
# from pyspark.ml.feature import VectorAssembler
# See:
# https://spark.apache.org/docs/latest/ml-features.html#vectorassembler
# OK
# Multiple regression formula Y = b + a1X1 + a2X2 + a3X3
# a1, a2, a3
print("Coefficients: " + str(lrModel.coefficients))
# Coefficients: [-1.996622715254686,-0.05670662125694535,0.6372473703412905]
# b
print("Intercept: " + str(lrModel.intercept))
# Intercept: -8.121408295465546
# =====================================
# Predict on the Test Data
# =====================================
predictions = lrModel.transform(testData)
predictions.select("prediction", "label", "features").show()
# Prediction MPG
# +------------------+-----+----------------+
# | prediction|label| features|
# +------------------+-----+----------------+
# | 8.321002336144055| 10.0|[8.0,215.0,70.0]|
# |12.954662267501302| 12.0|[8.0,167.0,73.0]|
# |10.942981450478428| 12.0|[8.0,180.0,71.0]|
# |13.918674828869374| 13.0|[8.0,150.0,73.0]|
# |12.784542403730466| 13.0|[8.0,170.0,73.0]|
# . . .
# |26.795409654500315| 24.0| [4.0,75.0,74.0]|
# |28.920503714037064| 24.5| [4.0,60.0,76.0]|
# |26.795409654500315| 25.0| [4.0,75.0,74.0]|
# +------------------+-----+----------------+
# =====================================
# Evaluate the Model
# =====================================
# label column is a target column
# r2 -> R^2 metric
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol = "label", \
metricName = "r2")
evaluator.evaluate(predictions)
# 0.7844712339243454 -> Good Model (> 0.7)